Kafka Consumers
What is a Kafka Consumer?
A Kafka Consumer is a client application that reads (consumes) messages from Kafka topics. Consumers subscribe to topics and process the stream of records published to those topics. They are responsible for reading data from Kafka brokers and processing it according to application logic.
Key Characteristics of Kafka Consumers
1. Consumer Groups
- Consumers are organized into consumer groups
- Each consumer in a group reads from different partitions
- Provides load balancing and fault tolerance
- Enables parallel processing of messages
2. Offset Management
- Consumers track their position using offsets
- Offsets are committed to Kafka or external storage
- Enables resuming consumption from last position
- Supports both automatic and manual offset management
3. Polling Model
- Consumers use a polling mechanism to fetch messages
- Non-blocking design for better performance
- Configurable poll intervals and batch sizes
- Supports both synchronous and asynchronous processing
Consumer Configuration
Essential Configuration Properties
// Bootstrap servers
bootstrap.servers=localhost:9092
// Consumer group
group.id=my-consumer-group
// Deserialization
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// Offset management
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=5000
// Performance settings
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
Key Configuration Parameters
Parameter | Description | Default | Recommended |
---|---|---|---|
group.id | Consumer group identifier | - | Required for group functionality |
auto.offset.reset | Offset reset policy | latest | earliest for new groups |
enable.auto.commit | Auto-commit offsets | true | false for manual control |
session.timeout.ms | Session timeout | 10000 | 30000-60000 |
heartbeat.interval.ms | Heartbeat interval | 3000 | 10000 |
max.poll.records | Max records per poll | 500 | 500-1000 |
Consumer Implementation Examples
Basic Consumer Example (Java)
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// Consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topic
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
// Poll for records
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
// Process the message
processMessage(record);
}
// Commit offsets (if auto-commit is disabled)
// consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
// Implement your message processing logic here
System.out.println("Processing: " + record.value());
}
}
Consumer with Manual Offset Management
public class ManualOffsetConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Process message
processMessage(record);
// Commit offset for this specific record
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
consumer.commitSync(offsets);
} catch (Exception e) {
// Handle processing error
System.err.println("Error processing message: " + e.getMessage());
// Don't commit offset - will retry on next poll
}
}
}
} finally {
consumer.close();
}
}
}
Consumer Patterns and Best Practices
1. Batch Processing
public class BatchProcessor {
private final KafkaConsumer<String, String> consumer;
private final int batchSize;
private final Duration pollTimeout;
public BatchProcessor(String topic, int batchSize) {
this.batchSize = batchSize;
this.pollTimeout = Duration.ofMillis(1000);
Properties props = new Properties();
// ... consumer configuration
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
}
public void processBatch() {
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
if (!records.isEmpty()) {
List<String> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
batch.add(record.value());
}
// Process batch
processBatch(batch);
// Commit offsets
consumer.commitSync();
}
}
}
2. Consumer with Dead Letter Queue
public class DeadLetterQueueConsumer {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> dlqProducer;
public void processWithDLQ() {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
// Commit offset on success
} catch (Exception e) {
// Send to Dead Letter Queue
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>("dlq-topic", record.key(), record.value());
dlqProducer.send(dlqRecord);
// Log error
System.err.println("Message sent to DLQ: " + record.value());
}
}
}
}
3. Consumer with Schema Registry (Avro)
public class AvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.println("User: " + user.getName() + ", Age: " + user.getAge());
}
}
}
}
Offset Management Strategies
1. Auto-Commit (Default)
// Enable auto-commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
Pros:
- Simple configuration
- Automatic offset management
- Good for simple applications
Cons:
- May commit offsets before processing
- Risk of message loss on failures
- Less control over commit timing
2. Manual Commit
// Disable auto-commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Manual commit after processing
consumer.commitSync(); // Synchronous commit
consumer.commitAsync(); // Asynchronous commit
Pros:
- Full control over commit timing
- Ensures processing before commit
- Better error handling
Cons:
- More complex implementation
- Risk of duplicate processing
- Manual offset management
3. External Offset Storage
public class ExternalOffsetConsumer {
private final OffsetStore offsetStore; // Database, Redis, etc.
public void processWithExternalOffsets() {
// Get last committed offset
long lastOffset = offsetStore.getLastOffset("my-topic", 0);
// Seek to last offset
consumer.seek(new TopicPartition("my-topic", 0), lastOffset);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
// Store offset externally
offsetStore.saveOffset("my-topic", 0, record.offset() + 1);
}
}
}
}
Consumer Group Management
1. Consumer Group Rebalancing
public class RebalancingConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// ... basic configuration
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe with rebalance listener
consumer.subscribe(Collections.singletonList("my-topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets before partitions are revoked
consumer.commitSync();
System.out.println("Partitions revoked: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
}
}
2. Static Membership
// Enable static membership for stable consumer group
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
Error Handling and Monitoring
1. Consumer Error Handling
public class RobustConsumer {
public void processWithErrorHandling() {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
} catch (ProcessingException e) {
// Handle processing errors
handleProcessingError(record, e);
} catch (Exception e) {
// Handle unexpected errors
handleUnexpectedError(record, e);
}
}
} catch (WakeupException e) {
// Handle consumer shutdown
break;
} catch (Exception e) {
// Handle poll errors
handlePollError(e);
}
}
}
}
2. Consumer Metrics
Key metrics to monitor:
- Records consumed rate: Messages per second
- Records lag: Messages behind latest offset
- Fetch latency: Time to fetch messages
- Commit latency: Time to commit offsets
- Rebalance rate: Frequency of rebalancing
3. Consumer Health Checks
public class ConsumerHealthCheck {
public boolean isHealthy(KafkaConsumer<String, String> consumer) {
try {
// Check if consumer can poll
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
return true;
} catch (Exception e) {
return false;
}
}
public Map<TopicPartition, Long> getConsumerLag(KafkaConsumer<String, String> consumer) {
Map<TopicPartition, Long> lag = new HashMap<>();
Set<TopicPartition> partitions = consumer.assignment();
for (TopicPartition partition : partitions) {
long currentOffset = consumer.position(Collections.singletonList(partition)).get(partition);
long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);
lag.put(partition, endOffset - currentOffset);
}
return lag;
}
}
Performance Optimization
1. Poll Configuration
// Optimize polling
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // Max records per poll
2. Memory Management
// Memory settings
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB per partition
3. Threading Models
// Single-threaded consumer
public class SingleThreadedConsumer {
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
}
}
}
}
// Multi-threaded consumer
public class MultiThreadedConsumer {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processMessage(record));
}
}
}
}
Security Considerations
1. Authentication
// SASL/PLAIN authentication
props.put(ConsumerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";");
2. Encryption
// SSL/TLS encryption
props.put(ConsumerConfig.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");
Best Practices Summary
- Use consumer groups for load balancing and fault tolerance
- Configure appropriate auto.offset.reset based on requirements
- Implement proper error handling with retry logic
- Monitor consumer lag to ensure timely processing
- Use manual offset management for critical applications
- Implement proper resource cleanup by closing consumers
- Handle rebalancing gracefully with rebalance listeners
- Configure security for production environments
- Test consumer behavior under various failure scenarios
- Monitor consumer metrics for performance and health
Kafka Consumers are responsible for reading and processing messages from Kafka topics. Understanding their configuration, patterns, and best practices is essential for building reliable and scalable data processing applications.